查看原文
其他

Pulsar Functions Worker 的选举机制

Growth ApachePulsar 2021-10-18

🎙️阅读本文需要约 6 分钟


摘要


在 Pulsar 中,Function、Source 和 Sink 都是运行在 Function Worker 上的,关于 Function 的内容可以参考一篇文章了解 Pulsar Functions


关于 Source 和 Sink 的使用,可以参考以下文章:本文介绍了 Functions Worker 的选举机制。阅读本文可以对 Functions Worker 有更加深入的理解。


Function Worker 架构


以下是 Pulsar Functions Worker 的架构图:



架构图中的各个组件 Metadata Mananger、Scheduler Manager、Runtime Manager 和 Membership Manager 都是运行在 Worker 上的。

Worker 的选举机制

  • 蓝色背景的是 Worker
  • 椭圆浅绿色的是订阅,也是一个 Consumer
  • 方形绿色背景的是 Broker
  • 方形红色背景的是 Topic


我们来了解下 Worker 的选举:
  1. 图中启动了三个 Worker,名称分别是 Worker Service1,Worker Service2,Worker Service3。这些 Worker 可以在同一台机器上,也可以在不同的机器。
  2. 启动时,每个 Worker 内部都会启动一个 Consumer,该 Consumer 用来进行选举,每个 Worker 的 id,主机名以及端口号会和该 Consumer 绑定。
  3. 该 Consumer 基于 Failover 模式启动(关于 Consumer 的 Failover 订阅模式可以参考这里 http://pulsar.apache.org/docs/en/concepts-messaging/#failover。)连接到同一个 Topic,具有相同订阅名称的 Consumer 中同一时刻只有一个处于活跃状态。
  4. 使用该 Consumer 的 Worker 就是 Leader,它负责进行调度并处理一些其他操作。


Worker Service1 和 Worker Service2 为一个集群,同时连接到 Topic1, 具有相同的订阅名称 sub,Leader 会在它们两个 Worker 中产生。
Worker Service3 为另一个集群,连接到 Topic2,订阅名称为 sub3,因为只有一个 Worker,所以它本身就是 Leader。


测试选举过程


下面我们来实践选举过程。在搭建本次试验前,需要在电脑上安装以下依赖,本次试验是在 Mac 系统上进行的测试。


Docker(https://github.com/streamnative/events/blob/master/blog-cn/docker.com):可以根据说明(https://www.docker.com/get-started)来完成 Docker 的安装。


  • 启动单机 Pulsar 服务


拉取 apachepulsar/pulsar:2.3.1 的镜像,然后使用 docker run 命令进行启动,-d 参数使该服务运行在后台模式,-it 以交互模式运行容器,并为容器分配一个伪输入终端。 --name 指定该容器使用 pulsar-standalone-function-leader 这个名字。


docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone-function-leader apachepulsar/pulsar:2.3.1 bin/pulsar standalone


1docker logs -f 
211:17:17.363 [pulsar-web-55-4] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
311:17:17.369 [pulsar-web-55-4] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
411:17:17.370 [pulsar-web-55-4] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.2 - - [18/May/2019:11:17:17 +0000] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.3.1" 10
511:17:17.377 [pulsar-web-55-12] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]


  • 模拟选举过程


Worker 的选举是基于 Consumer 的 Failover 模式,因此在本次测试中,直接启动 Consumer 来模拟选举。
如上选举图所示,本次测试会启动三个 Failover 模式的 Consumer,其中两个 Consumer 连接到 Topic1,使用的订阅名称为 sub,它们代表一个 Worker 集群;另一个 Consumer 连接到 Topic2,使用订阅名称 sub3 代表另一个 Worker 集群。


为了本次测试,需要开启四个窗口,分别命名为 window1,window2,window3,window4。window4 用来进行验证。


—— window1以 Failover 模式启动一个消费者,使用订阅名称 sub,订阅到 public/default/topic1。


1docker exec -it pulsar-standalone-function-leader /bin/bash
2./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover


—— window2

以 Failover 模式启动一个消费者,使用订阅名称 sub,订阅到 public/default/topic1。


1docker exec -it pulsar-standalone-function-leader /bin/bash
2./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover


—— window3

以 Failover 模式启动一个消费者,使用订阅名称 sub3,订阅到 public/default/topic2。


1docker exec -it pulsar-standalone-function-leader /bin/bash
2./bin/pulsar-client consume persistent://public/default/topic2 --num-messages 0 --subscription-name sub3 -t Failover


三个窗口的操作都使用了相同的订阅模式 Failover,window1 和 window2 使用相同的订阅名称订阅到同一个 Topic。


—— window4

通过命令 topics stats 来获取 topic1 的统计信息。


1./bin/pulsar-admin topics stats topic1
2{
3  "msgRateIn" : 0.0,
4  "msgThroughputIn" : 0.0,
5  "msgRateOut" : 0.0,
6  "msgThroughputOut" : 0.0,
7  "averageMsgSize" : 0.0,
8  "storageSize" : 0,
9  "publishers" : [ ],
10  "subscriptions" : {
11    "sub" : {
12      "msgRateOut" : 0.0,
13      "msgThroughputOut" : 0.0,
14      "msgRateRedeliver" : 0.0,
15      "msgBacklog" : 0,
16      "blockedSubscriptionOnUnackedMsgs" : false,
17      "unackedMessages" : 0,
18      "type" : "Failover",
19      "activeConsumerName" : "383dc",
20      "msgRateExpired" : 0.0,
21      "consumers" : [ {
22        "msgRateOut" : 0.0,
23        "msgThroughputOut" : 0.0,
24        "msgRateRedeliver" : 0.0,
25        "consumerName" : "383dc",
26        "availablePermits" : 1000,
27        "unackedMessages" : 0,
28        "blockedConsumerOnUnackedMsgs" : false,
29        "metadata" : { },
30        "address" : "/127.0.0.1:50014",
31        "connectedSince" : "2019-05-18T11:30:34.161Z",
32        "clientVersion" : "2.3.1"
33      }, {
34        "msgRateOut" : 0.0,
35        "msgThroughputOut" : 0.0,
36        "msgRateRedeliver" : 0.0,
37        "consumerName" : "51911",
38        "availablePermits" : 1000,
39        "unackedMessages" : 0,
40        "blockedConsumerOnUnackedMsgs" : false,
41        "metadata" : { },
42        "address" : "/127.0.0.1:50018",
43        "connectedSince" : "2019-05-18T11:30:42.742Z",
44        "clientVersion" : "2.3.1"
45      } ]
46    }
47  },
48  "replication" : { },
49  "deduplicationStatus" : "Disabled"
50}


在 Topic1 下出现了一个订阅,两个 Consumer,当前的 activeConsumerName 是 383dc,正是 window1 中的订阅。这说明当前持有该订阅的 Worker 为 Leader。当把 window1 的订阅关掉后,再看一下。在 window1 中,使用 CTRL + C 关掉该 Consumer,再回到 window4 查看订阅。


1./bin/pulsar-admin topics stats topic1
2{
3  "msgRateIn" : 0.0,
4  "msgThroughputIn" : 0.0,
5  "msgRateOut" : 0.0,
6  "msgThroughputOut" : 0.0,
7  "averageMsgSize" : 0.0,
8  "storageSize" : 0,
9  "publishers" : [ ],
10  "subscriptions" : {
11    "sub" : {
12      "msgRateOut" : 0.0,
13      "msgThroughputOut" : 0.0,
14      "msgRateRedeliver" : 0.0,
15      "msgBacklog" : 0,
16      "blockedSubscriptionOnUnackedMsgs" : false,
17      "unackedMessages" : 0,
18      "type" : "Failover",
19      "activeConsumerName" : "51911",
20      "msgRateExpired" : 0.0,
21      "consumers" : [ {
22        "msgRateOut" : 0.0,
23        "msgThroughputOut" : 0.0,
24        "msgRateRedeliver" : 0.0,
25        "consumerName" : "51911",
26        "availablePermits" : 1000,
27        "unackedMessages" : 0,
28        "blockedConsumerOnUnackedMsgs" : false,
29        "metadata" : { },
30        "connectedSince" : "2019-05-18T11:30:42.742Z",
31        "clientVersion" : "2.3.1",
32        "address" : "/127.0.0.1:50018"
33      } ]
34    }
35  },
36  "replication" : { },
37  "deduplicationStatus" : "Disabled"
38}


可以看到 window2 中启动的订阅被成功激活,当前的 activeConsumerName 是 51911,这时持有该 Consumer 的 Worker 便成为了 Leader。


总结


以上是在 Connector 运行 Worker 的选举机制,可以看到其非常巧妙的运用了 Consumer 的 Failover 模式,来实现 Worker 的高可用机制。


作者 | tuteng

审校 | Jennifer

编辑 | Irene




📣Join Pulsar 📣


Apache Pulsar 鼓励大家积极参与开源社区,欢迎大家积极提交 PR,具体 contribute 流程可以点击「阅读原文」查看哦~


: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存